Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-7745][VL] Incorporate SQL Union operator into Velox execution pipeline #7842

Merged
merged 18 commits into from
Dec 3, 2024

Conversation

zhztheplayer
Copy link
Member

@zhztheplayer zhztheplayer commented Nov 7, 2024

#7745

Edit: The single-threaded execution issue will be fixed in Velox facebookincubator/velox@e80bf12.

Introduce new config spark.gluten.sql.native.union (by default, false) which works for Velox backend only at the moment.

Native union will not be used when children of union operator don't share the same known partition number. (perhaps can be fixed in future)

There is a Spark UT failing when enabling native union, it's related to Velox LocalParition output order and requires fixing in future:

2024-11-29T12:48:14.0003314Z - SPARK-14393: values generated by non-deterministic functions shouldn't change after coalesce or union *** FAILED ***
2024-11-29T12:48:14.0004432Z   Array([0,0], [2,1], [1,8589934592], [3,8589934593]) did not equal Array([0,0], [1,8589934592], [2,0], [3,8589934592]) Values changed after union when codegenFallback=true and wholeStage=false. (DataFrameFunctionsSuite.scala:3780)

So far the failure is benign since in SQL we don't often guarantee output ordering of union operator.

This is currently a PoC and not yet workable because of a blocker from Velox's single-threaded execution design:

I20241107 15:06:23.310680 1849235 VeloxPlanConverter.cc:128] Plan Node: 
-- Aggregation[7][PARTIAL n7_0 := count_partial("n0_0")] -> n7_0:BIGINT
  -- LocalPartition[6][GATHER] -> n0_0:BIGINT
    -- Project[4][expressions: (n0_0:BIGINT, "n1_1")] -> n0_0:BIGINT
      -- Project[1][expressions: (n1_1:BIGINT, "n0_0")] -> n1_1:BIGINT
        -- TableScan[0][table: hive_table] -> n0_0:BIGINT
    -- Project[5][expressions: (n0_0:BIGINT, "n3_1")] -> n0_0:BIGINT
      -- Project[3][expressions: (n3_1:BIGINT, "n2_0")] -> n3_1:BIGINT
        -- TableScan[2][table: hive_table] -> n2_0:BIGINT
24/11/07 15:06:23 ERROR TaskResources: Task 11 failed by error: 
org.apache.gluten.exception.GlutenException: Task doesn't support single thread execution: -- Aggregation[7]

	at org.apache.gluten.vectorized.PlanEvaluatorJniWrapper.nativeCreateKernelWithIterator(Native Method)
	at org.apache.gluten.vectorized.NativePlanEvaluator.createKernelWithBatchIterator(NativePlanEvaluator.java:66)
	at org.apache.gluten.backendsapi.velox.VeloxIteratorApi.genFirstStageIterator(VeloxIteratorApi.scala:214)
	at org.apache.gluten.execution.GlutenWholeStageColumnarRDD.$anonfun$compute$1(GlutenWholeStageColumnarRDD.scala:88)
	at org.apache.gluten.utils.Arm$.withResource(Arm.scala:25)
	at org.apache.gluten.metrics.GlutenTimeMetric$.millis(GlutenTimeMetric.scala:37)
	at org.apache.gluten.execution.GlutenWholeStageColumnarRDD.compute(GlutenWholeStageColumnarRDD.scala:77)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Related Velox code:

https://github.com/facebookincubator/velox/blob/12b52e70ec85ae0cdb4aa990797cddda9be5be27/velox/exec/Driver.h#L658-L660

@github-actions github-actions bot added CORE works for Gluten Core VELOX labels Nov 7, 2024
@zhztheplayer zhztheplayer changed the title PoC: [VL] Incorporate SQL Union operator into Velox execution pipeline PoC: [GLUTEN-7745][VL] Incorporate SQL Union operator into Velox execution pipeline Nov 7, 2024
Copy link

github-actions bot commented Nov 7, 2024

Thanks for opening a pull request!

Could you open an issue for this pull request on Github Issues?

https://github.com/apache/incubator-gluten/issues

Then could you also rename commit message and pull request title in the following format?

[GLUTEN-${ISSUES_ID}][COMPONENT]feat/fix: ${detailed message}

See also:

Copy link

github-actions bot commented Nov 7, 2024

Run Gluten Clickhouse CI on x86

Copy link

Run Gluten Clickhouse CI on x86

3 similar comments
Copy link

Run Gluten Clickhouse CI on x86

Copy link

Run Gluten Clickhouse CI on x86

Copy link

Run Gluten Clickhouse CI on x86

Copy link

Run Gluten Clickhouse CI on x86

1 similar comment
Copy link

Run Gluten Clickhouse CI on x86

Copy link

Run Gluten Clickhouse CI on x86

1 similar comment
Copy link

Run Gluten Clickhouse CI on x86

Copy link

Run Gluten Clickhouse CI on x86

Copy link

Run Gluten Clickhouse CI on x86

Copy link

github-actions bot commented Dec 2, 2024

Run Gluten Clickhouse CI on x86

@zhztheplayer zhztheplayer marked this pull request as ready for review December 2, 2024 00:22
Copy link

github-actions bot commented Dec 2, 2024

Run Gluten Clickhouse CI on x86

Copy link

github-actions bot commented Dec 2, 2024

Run Gluten Clickhouse CI on x86

@zhztheplayer zhztheplayer changed the title PoC: [GLUTEN-7745][VL] Incorporate SQL Union operator into Velox execution pipeline [GLUTEN-7745][VL] Incorporate SQL Union operator into Velox execution pipeline Dec 2, 2024
Copy link

github-actions bot commented Dec 2, 2024

#7745

Copy link

github-actions bot commented Dec 2, 2024

Run Gluten Clickhouse CI on x86

Copy link

github-actions bot commented Dec 2, 2024

Run Gluten Clickhouse CI on x86

Copy link

github-actions bot commented Dec 2, 2024

Run Gluten Clickhouse CI on x86

Copy link
Contributor

@PHILO-HE PHILO-HE left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Some minor comments.

expected = df.collect()
}
// By default we will fallabck complex type scan but here we should allow
// By default, we will fallabck complex type scan but here we should allow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also fix typo: fallabck.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

/**
* run a query with native engine as well as vanilla spark then compare the result set for
* correctness check
*/
protected def compareResultsAgainstVanillaSpark(
sqlStr: String,
protected def compareDfResultsAgainstVanillaSpark(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Df means Dataframe? The naming seems not friendly to reader.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just use DF? It's a common abbreviation of DataFrame in spark. I think we'd avoid making names too long.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhztheplayer, I see. It's ok to firstly use your proposed name. Thanks!

}
}

private def sameNumPartitions(plans: Seq[SparkPlan]): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this into the below validate function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its purpose is to make the code self-explanatory. Say reader will know we are comparing the partition numbers of children because they see sameNumPartitons as method name.

bool validateInputTypes(const ::substrait::extensions::AdvancedExtension& extension, std::vector<TypePtr>& types);
/// Used to get types from advanced extension and validate them, then convert to a Velox type that has arbitrary
/// levels of nesting.
bool validateInputVeloxType(const ::substrait::extensions::AdvancedExtension& extension, TypePtr& out);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validateInputVeloxType makes me feel that the input is velox type. Can we use some other name, e.g., parseToVeloxType?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing to parseVeloxType. Thanks.

Copy link

github-actions bot commented Dec 2, 2024

Run Gluten Clickhouse CI on x86

Comment on lines +271 to +273
// FIXME: The whole metrics system in gluten-substrait is magic. Passing metrics trees through JNI with a trivial
// array is possible but requires for a solid design. Apparently we haven't had it. All the code requires complete
// rework.
Copy link
Member Author

@zhztheplayer zhztheplayer Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found it's really painful to add new metrics in Gluten Velox backend. There is a set of metrics from Velox query plan being converted to a tree or an array again and again during passing to Spark Java side. A lot of magical IDs are used during the tree / array traversal algorithms with a lot of special handling of corner cases, e.g., join, filter project, and union which is added in this PR.

The whole metric system should be reworked (if someone could lead this work) otherwise it's unmaintainable.

Copy link

github-actions bot commented Dec 2, 2024

Run Gluten Clickhouse CI on x86

@zhztheplayer zhztheplayer merged commit 6dd91ba into apache:main Dec 3, 2024
49 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLICKHOUSE CORE works for Gluten Core VELOX
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants